package k2paltform.common.util;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


public class ZookeeperService implements IService {

	public static final String ZOOKEEPER_PATH_SEPARATOR = "/";

	public static final Log LOG = LogFactory.getLog(ZookeeperService.class);

	private static Map<String, Set<IWatcher>> WATCHERS = new ConcurrentHashMap<>();

	private Configuration conf;
	private CuratorFramework client;

	public ZookeeperService(String zookeeperUrl) {
		this(zookeeperUrl, Constants.ZOOKEEPER_NAMESPACE);
	}

	public ZookeeperService(String zookeeperUrl, String namespace) {
		this.conf = new Configuration();
		conf.setProperty(ParamNames.ZOOKEEPER_URL, zookeeperUrl);
		conf.setProperty(ParamNames.ZOOKEEPER_NAMESPACE, namespace);
		client = CuratorFrameworkFactory.builder()
				.connectString(conf.getString(ParamNames.ZOOKEEPER_URL))
				.namespace(conf.getString(ParamNames.ZOOKEEPER_NAMESPACE))
				.retryPolicy(new RetryNTimes(Constants.ZOOKEEPER_RETRY_TIME, 1000))
				.connectionTimeoutMs(Integer.MAX_VALUE).build();
	}

	public ZookeeperService(Configuration conf) {
		if(conf.containsKey(ParamNames.ZOOKEEPER_URL) == false)
			LOG.error("Can't find zookeeper.url in configuration. Please check your configuration.");
		else {
			this.conf = conf;
			client = CuratorFrameworkFactory.builder()
					.connectString(conf.getString(ParamNames.ZOOKEEPER_URL))
					.namespace(conf.getString(ParamNames.ZOOKEEPER_NAMESPACE))
					.retryPolicy(new RetryNTimes(Constants.ZOOKEEPER_RETRY_TIME, 1000))
					.connectionTimeoutMs(Integer.MAX_VALUE).build();
		}
	}

	public static Map<String, Set<IWatcher>> getWatchers() {
		return ZookeeperService.WATCHERS;
	}

	/**Before using zookeeper service, client should start service first.**/
	public void start() {
		LOG.debug("Start Zookeeper service!");
		client.start();
	}

	public boolean isClosed() {
		return client.getState() == CuratorFrameworkState.STOPPED ? true : false;
	}

	public boolean isStarted() {
		return client.getState() == CuratorFrameworkState.STARTED ? true : false;
	}

	public String getNamespace() {
		return conf.getString(ParamNames.ZOOKEEPER_NAMESPACE);
	}

	/**After using zookeeper service, client should close service. **/
	public void close() {
		LOG.debug("Close Zookeeper service!");
		client.close();
	}

	public void register(final String path, final IWatcher watchObj) {
		if(watchObj == null)
			return;
		if(WATCHERS.containsKey(path) == false)
			WATCHERS.put(path, new HashSet<IWatcher>());
		WATCHERS.get(path).add(watchObj);

		if(watchObj instanceof NodeWatcher) {
			final NodeCache node = new NodeCache(client, path);
			try {
				node.start();
			} catch (Exception e) {
				LOG.error("Error to start node cache for " + path);
				e.printStackTrace();
			}

			((NodeWatcher) watchObj).setNodeCache(node);

			node.getListenable().addListener(new NodeCacheListener() {
				@Override
				public void nodeChanged() throws Exception {
					NodeWatcher tmp = (NodeWatcher)watchObj;
					tmp.clear();

					if(tmp.getNodeDataSign()) {
						tmp.load(node.getCurrentData().getData());
					}
					else {
						tmp.load(ZookeeperService.this.getChildrenData(path));
					}
				}
			});
		}
		else if(watchObj instanceof PathWatcher){
			PathChildrenCache pathCache = new PathChildrenCache(client, path, true);
			try {
				pathCache.start();
			} catch (Exception e) {
				LOG.error("Error to start path cache for " + path);
				e.printStackTrace();
			}

			((PathWatcher) watchObj).setPathCache(pathCache);

			pathCache.getListenable().addListener(new PathChildrenCacheListener() {
				@Override
				public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

					if(event.getData() == null || event.getData().getPath() == null)
						return;

					String path = event.getData().getPath();
					int pos = path.lastIndexOf("/");
					String childName = path.substring(pos + 1);

					if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
						((PathWatcher) watchObj).add(childName, event.getData().getData());
					}
					else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
						((PathWatcher) watchObj).remove(childName);
					}
					else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
						((PathWatcher) watchObj).update(childName, event.getData().getData());
					}
				}
			});
		}
		else if(watchObj instanceof TreeWatcher){
			TreeCache treeCache = new TreeCache(client, path);
			try {
				treeCache.start();
			} catch (Exception e) {
				LOG.error("Error to start tree cache for " + path);
				e.printStackTrace();
			}
			((TreeWatcher) watchObj).setTreeCache(treeCache);

			treeCache.getListenable().addListener(new TreeCacheListener() {
				@Override
				public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {

					if(event.getData() == null || event.getData().getPath() == null)
						return;

					String basePath = ((TreeWatcher) watchObj).getBasePath();
					int pos = basePath.lastIndexOf("/");
					String path = event.getData().getPath();
					String childName = path.substring(pos + 1);

					if(event.getType().equals(TreeCacheEvent.Type.NODE_ADDED)) {
						((TreeWatcher) watchObj).add(childName, event.getData().getData());
					}
					else if(event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
						((TreeWatcher) watchObj).remove(childName);
					}
					else if(event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
						((TreeWatcher) watchObj).update(childName, event.getData().getData());
					}
				}
			});
		}
	}

	public void unregister(String path, IWatcher watchObj) {
		if(watchObj == null)
			return;

		if(WATCHERS.containsKey(path)) {
			if(WATCHERS.get(path).contains(watchObj)) {
				if(WATCHERS.get(path).size() == 1)
					WATCHERS.remove(path);
				else
					WATCHERS.get(path).remove(watchObj);
			}
		}

		if(watchObj instanceof NodeWatcher) {
			try {
				((NodeWatcher) watchObj).closeNodeCache();
			} catch (IOException e) {
				LOG.error("Error to close Node Watcher on " + path);
				e.printStackTrace();
			}
		}
		else if(watchObj instanceof PathWatcher) {
			try {
				((PathWatcher) watchObj).closePathCache();
			} catch (IOException e) {
				LOG.error("Error to close Path Watcher on " + path);
				e.printStackTrace();
			}
		}
		else if(watchObj instanceof TreeWatcher) {
			try {
				((TreeWatcher) watchObj).closeTreeCache();
			} catch (IOException e) {
				LOG.error("Error to close Path Watcher on " + path);
				e.printStackTrace();
			}
		}
	}

	public byte[] get(String path) throws Exception {
		if(checkExists(path) == false)
			return null;
		return client.getData().forPath(path);
	}

	public List<String> getChildren(String path) throws Exception {
		return client.getChildren().forPath(path);
	}

	public Map<String, byte[]> getChildrenData(String path) throws Exception {
		if(checkExists(path) == false)
			return null;

		Map<String, byte[]> results = new HashMap<String, byte[]>();
		List<String> children = client.getChildren().forPath(path);
		for(String child : children) {
			results.put(child, client.getData().forPath(path + ZOOKEEPER_PATH_SEPARATOR + child));
		}
		return results;
	}

	public boolean checkExists(String path) throws Exception {
		Stat pstat = client.checkExists().forPath(path);
		if(pstat == null)
			return false;
		else
			return true;
	}

	public void update(String path, byte[] data) throws Exception {
		if(this.checkExists(path)){
			client.setData().forPath(path, data);
		}
		else{
			this.create(path, data);
		}
	}

	public void delete(String path) throws Exception {
		if(this.checkExists(path)) {
			client.delete().deletingChildrenIfNeeded().forPath(path);
		}
	}

	public void create(String path, byte[] data) throws Exception {

		if(client.checkExists().forPath(path) != null)
			client.delete().deletingChildrenIfNeeded().forPath(path);
		client.create().creatingParentsIfNeeded().forPath(path, data);
	}

	public void create(String path, Map<String, byte[]> datas) throws Exception {
		for(String child : datas.keySet()) {
			if(client.checkExists().forPath(path + ZOOKEEPER_PATH_SEPARATOR + child) != null)
				client.delete().deletingChildrenIfNeeded().forPath(path + ZOOKEEPER_PATH_SEPARATOR + child);
			client.create().creatingParentsIfNeeded().forPath(path + ZOOKEEPER_PATH_SEPARATOR + child, datas.get(child));
		}
	}
}
